package com.lianda.connectors.utils;

import com.google.gson.Gson;
import com.lianda.model.MetricEvent;
import com.lianda.model.Student;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * 往Kafka中写数据
 */
public class KafkaUtil {
    public static final String broker_list = "localhost:9092";
    public static final String topic = "metric";  // kafka topic，Flink 程序中需要和这个统一

    public static void writeToKafka() throws InterruptedException {
        Properties props = new Properties();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker_list);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //key 序列化
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //value 序列化
        KafkaProducer producer = new KafkaProducer<String, String>(props);

        MetricEvent metric = new MetricEvent();
        metric.setTimestamp(System.currentTimeMillis());
        metric.setName("mem");
        Map<String, String> tags = new HashMap<>();
        Map<String, Object> fields = new HashMap<>();

        tags.put("cluster", "lianda");
        tags.put("host_ip", "192.168.1.5");

        fields.put("used_percent", 90d);
        fields.put("max", 27244873d);
        fields.put("used", 17244873d);
        fields.put("init", 27244873d);

        metric.setTags(tags);
        metric.setFields(fields);

        ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, GsonUtil.toJson(metric));
        producer.send(record);

        System.out.println("发送数据: " + GsonUtil.toJson(metric));
        producer.flush();
    }

    //写数据到kafka
    public static void writeStudentToKafka() throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", broker_list);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer<String, String>(props);

        for (int i = 1; i <= 2; i++) {
            Student student = new Student(i, "lianda_" + i, "password" + i, 18 + i);
            ProducerRecord record = new ProducerRecord<String, String>("student", null, null, GsonUtil.toJson(student));
            producer.send(record);
            System.out.println("发送数据: " + GsonUtil.toJson(student));
        }

        Thread.sleep(3000);
        producer.flush();
    }

    public static void main(String[] args) throws InterruptedException {
//        int i = 0;
//        while (i < 20) {
//            Thread.sleep(3000);
//            writeToKafka();
//            i++;
//        }

        writeStudentToKafka();
    }
}
